package org.yun.worker;

import com.google.common.annotations.Beta;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.yun.biz.dao.EventLogRepository;
import org.yun.biz.dao.SkuActivityMapRepository;
import org.yun.biz.dao.StockRepository;
import org.yun.biz.model.EventLog;
import org.yun.biz.model.SkuActivityMap;
import org.yun.biz.model.Stock;
import org.yun.util.IDUtil;
import org.yun.util.RedisUtil;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executors;

import static org.yun.constants.RedisConstant.STOCK;
import static org.yun.enums.EventLogEnum.*;

/**
 * @ProjectName: no-concurrent
 * @ClassName: Task
 * @Description: SyncStockTask 同步mysql的库存到redis
 * @Author: liyunfeng31
 * @Date: 2020/10/6 22:40
 */
@SuppressWarnings("UnstableApiUsage")
@Slf4j
@Component
@EnableScheduling
public class SyncStockTask {

    @Resource
    private RedisUtil redisUtil;

    @Resource
    private StockRepository stockDao;

    @Resource
    private EventLogRepository logDao;


    @Resource
    private SkuActivityMapRepository mapDao;


    /**
     * 处理数据条数阈值，大于此值将启用线程池
     */
    private static final int DATA_LIMIT = 500;


    /**
     * 带回调的任务
     */
    private static List<ListenableFuture<Integer>> FUTURES = Lists.newArrayList();

    /**
     *  创建大小为5的线程池
     */
    private static ListeningExecutorService POOL = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5));;


    /**
     * 每晚1点执行库存缓存比对
     */
    @Scheduled(cron = "0 0 1 * * ?")
    public void compare(){
        try {
            List<SkuActivityMap> skuActList = mapDao.findAll();
            if(CollectionUtils.isEmpty(skuActList)){ return; }

            if(skuActList.size() <= DATA_LIMIT){
                compareAndSync(skuActList);
            }else{
                Lists.partition(skuActList, 100).forEach(this::compareAndSync);
            }
            ListenableFuture<List<Integer>> resultsFuture = Futures.allAsList(FUTURES);
            List<Integer> rs = resultsFuture.get();
            saveLog(IDUtil.id(), SYNC_STOCK_CACHE_RESULT.getCode(),rs.size()+"");
        } catch (Exception e){
            log.error("=====>>>>>使用cron  {}", LocalDateTime.now().toString());
        }
        log.info("=====>>>>>使用cron  {}",System.currentTimeMillis());
    }


    /**
     * 比较库存和缓存
     * @param skuActMaps sku-activity-map
     */
    private void compareAndSync(List<SkuActivityMap> skuActMaps){

        ListenableFuture<Integer> fd = POOL.submit(() -> {
            for (SkuActivityMap skuActMap : skuActMaps) {
                Long skuId = skuActMap.getSkuId();
                Stock dbStock = stockDao.findBySkuId(skuId);
                if(dbStock == null){
                    saveLog(skuId, SYNC_STOCK_CACHE_MISS.getCode(), SYNC_STOCK_CACHE_MISS.getName());
                    continue;
                }
                Integer validStock = dbStock.getValidStock();
                String skuAct = skuId +":"+ skuActMap.getActivityId();
                String stockKey = STOCK + skuAct;
                Object cacheStock = redisUtil.get(stockKey);
                if (!Objects.isNull(cacheStock)) {
                    int stock = (int) cacheStock;
                    if (stock == validStock) {
                        continue;
                    }
                }
                saveLog(skuId, SYNC_STOCK_CACHE_DIFF.getCode(), SYNC_STOCK_CACHE_DIFF.getName());
            }
            return 1;
        });
        FUTURES.add(fd);
    }


    /**
     * save log
     * @param key biz_id
     * @param bizCode biz_code
     * @param content content
     */
    private void saveLog(Long key, Integer bizCode, String content){
        logDao.save(new EventLog(key,bizCode,content,"sys"));
    }
}
